import os
import uuid
import subprocess
import tempfile
import tools.optscale_time as opttime
from ast import literal_eval
from datetime import datetime, timezone
from unittest.mock import patch, PropertyMock
from clickhouse_connect.driver.external import ExternalData

import mongomock
import tornado.testing
from collections import defaultdict
from tools.cloud_adapter.model import (
    InstanceResource, VolumeResource, PodResource, IpAddressResource)
import optscale_client.rest_api_client.client
import optscale_client.rest_api_client.client_v2
from rest_api.rest_api_server.models.db_base import BaseDB
from rest_api.rest_api_server.models.db_factory import DBType, DBFactory
from rest_api.rest_api_server.models.models import (
    CloudAccount, Pool, OrganizationConstraint, OrganizationConstraintTypes,
    OrganizationLimitHit)
from rest_api.rest_api_server.server import make_app
from rest_api.rest_api_server.utils import get_root_directory_path
from pymongo import UpdateMany
from contextlib import contextmanager

SECONDS_IN_DAY = 86400


class TestApiBase(tornado.testing.AsyncHTTPTestCase):
    WITH_SUBPOOLS_SIGN = '+'

    def get_app(self):
        return make_app(DBType.Test, '127.0.0.1', 80)

    def setUp(self, version='v2'):
        super().setUp()
        os.environ['ASYNC_TEST_TIMEOUT'] = '30'
        patch(
            'rest_api.rest_api_server.handlers.v1.base.Config').start()
        patch(
            'rest_api.rest_api_server.handlers.v1.base.AuthClient.authorize',
            lambda *args: (200, {'success'})).start()
        patch('rest_api.rest_api_server.utils.Config').start()
        patch('optscale_client.config_client.client.Client.auth_url').start()
        patch('optscale_client.config_client.client.Client.restapi_url').start()
        patch('optscale_client.config_client.client.Client.public_ip').start()
        patch(
            'optscale_client.config_client.client.Client.product_name',
            return_value='OptScale',
        ).start()
        secret = self.gen_id()
        patch('optscale_client.config_client.client.Client.cluster_secret',
              return_value=secret).start()
        patch('rest_api.rest_api_server.utils._get_encryption_salt',
              return_value='test_encryption_salt').start()
        self.p_auth_users = patch(
            'rest_api.rest_api_server.controllers.employee.EmployeeController._get_auth_users',
            return_value=[]).start()
        self._user_id = self.gen_id()
        self.p_get_user_info = patch(
            'rest_api.rest_api_server.controllers.base.BaseController.get_user_info',
            return_value={'display_name': 'John', 'id': self._user_id,
                          'email': 'john@smith.com',
                          'is_password_autogenerated': False}
        ).start()
        patch(
            'rest_api.rest_api_server.controllers.base.BaseController.run_async'
        ).start()
        patch('rest_api.rest_api_server.controllers.organization.'
              'OrganizationController.create_report_subscriptions').start()
        patch('rest_api.rest_api_server.controllers.organization.'
              'OrganizationController.delete_report_subscriptions').start()
        http_provider = optscale_client.rest_api_client.client.FetchMethodHttpProvider(
            self.fetch, rethrow=False)
        self.client = TestApiBase.get_client(version).Client(
            http_provider=http_provider)
        self.client.token = 'token'
        self.client.secret = secret
        self.mongo_client = mongomock.MongoClient()
        self.expenses = []
        self.traffic_expenses = []
        self.ri_sp_usage = []
        self.uncovered_usage = []
        self.gemini = []
        self.raw_expenses = self.mongo_client.restapi.raw_expenses
        self.resources_collection = self.mongo_client.restapi.resources
        self.resources_collection.create_index(
            [
                ('cloud_resource_id', 1), ('cloud_resource_hash', 1),
                ('cloud_account_id', 1), ('organization_id', 1),
                ('deleted_at', 1)
            ],
            name='OptResourceUnique',
            unique=True
        )
        self.checklists_collection = self.mongo_client.restapi.checklists
        self.property_history_collection = self.mongo_client.restapi.property_history
        self.archived_recommendations_collection = self.mongo_client.restapi.archived_recommendations
        patch('rest_api.rest_api_server.controllers.base.MongoMixin.mongo_client',
              new_callable=PropertyMock, return_value=self.mongo_client
              ).start()
        self.p_send_ca_email = patch(
            'rest_api.rest_api_server.controllers.cloud_account.CloudAccountController.'
            'send_cloud_account_email').start()
        self.allowed_user_pool_actions = {}
        self.p_g_b_a_a_b_m = patch(
            'rest_api.rest_api_server.controllers.base.BaseController.'
            'get_bulk_allowed_action_pools_map',
            side_effect=self.get_bulk_allowed_action_pools_map).start()
        patch('rest_api.rest_api_server.controllers.employee.Config').start()
        patch(
            'rest_api.rest_api_server.controllers.employee.AuthClient.assignment_list',
            return_value=(200, [])).start()
        patch('rest_api.rest_api_server.controllers.expense.CleanExpenseController.'
              '_aggregate_resource_data',
              wraps=self.patched_aggregate_resource_ids).start()
        patch('rest_api.rest_api_server.controllers.available_filters.'
              'AvailableFiltersController._aggregate_resource_data',
              wraps=self.patched_aggregate_resource_filters).start()
        patch('rest_api.rest_api_server.controllers.organization_constraint.'
              'FilterDetailsController._aggregate_resource_data',
              wraps=self.patched_aggregate_resource_filters).start()
        patch('rest_api.rest_api_server.controllers.breakdown_expense.'
              'BreakdownExpenseController._aggregate_resource_data',
              wraps=self.patched_aggregate_breakdown_expenses).start()
        patch(
            'rest_api.rest_api_server.controllers.resource_count.'
            'ResourceCountController._get_resources_breakdowns',
            wraps=self.patched_get_resources_breakdowns_pipeline).start()
        patch('rest_api.rest_api_server.controllers.base.ClickHouseMixin.'
              'execute_clickhouse',
              wraps=self.patched_execute_clickhouse).start()
        patch(
            'rest_api.rest_api_server.controllers.expense.ExpenseController.'
            'delete_cloud_expenses').start()
        patch(
            'rest_api.rest_api_server.controllers.cloud_account.'
            'CloudAccountController.clean_clickhouse').start()
        patch(
            'rest_api.rest_api_server.controllers.organization.'
            'OrganizationController.clean_clickhouse').start()
        patch('rest_api.rest_api_server.controllers.base.'
              'BaseController.publish_activities_task').start()
        self.p_get_meta_by_token = patch(
            'rest_api.rest_api_server.handlers.v1.base.BaseAuthHandler.'
            'get_meta_by_token', return_value={
                'user_id': self._user_id,
                'valid_until': opttime.utcnow_timestamp() * 2
            }).start()
        patch('rest_api.rest_api_server.controllers.base.BaseController.'
              'assign_role_to_user').start()
        self.mock_email_send()

    def set_allowed_pair(self, auth_user_id, pool_id, actions=None):
        if auth_user_id not in self.allowed_user_pool_actions:
            self.allowed_user_pool_actions[auth_user_id] = {}
        action_pools = self.allowed_user_pool_actions[auth_user_id]
        if actions is None:
            actions = ['MANAGE_RESOURCES', 'MANAGE_OWN_RESOURCES']
        for action in actions:
            if action not in action_pools:
                action_pools[action] = []
            action_pools[action].append(pool_id)

    def get_bulk_allowed_action_pools_map(self, user_ids, actions):
        result = {}
        for user_id in user_ids:
            user_action_pools = self.allowed_user_pool_actions.get(
                user_id, {}).copy()
            for action in actions:
                if action not in user_action_pools:
                    user_action_pools[action] = []

            result[user_id] = user_action_pools
        return result

    @staticmethod
    def get_publish_activity_tuple(org_id, object_id, object_type, action,
                                   meta, routing_key=None):
        if routing_key is None:
            routing_key = '{object_type}.{action}'.format(
                object_type=object_type, action=action)
        return org_id, object_id, object_type, action, meta, routing_key

    def cloud_resource_create(self, cloud_account_id, params,
                              set_allowed=True):
        pool_id = params.get('pool_id')
        employee_id = params.get('employee_id')
        if pool_id is not None and employee_id is not None and set_allowed:
            _, employee = self.client.employee_get(employee_id)
            self.set_allowed_pair(employee['auth_user_id'], pool_id)
        return self.client.cloud_resource_create(cloud_account_id, params)

    def mock_email_send(self):
        self.p_optscale_email_recipient = patch(
            'optscale_client.config_client.client.Client.optscale_email_recipient',
            return_value='').start()
        patch('optscale_client.config_client.client.Client.herald_url').start()
        patch('optscale_client.herald_client.client_v2.Client.email_send').start()

    def mock_email_send_enable(self):
        self.p_optscale_email_recipient.return_value = 'john@smith.com'
        return patch('optscale_client.herald_client.client_v2.Client.email_send').start()

    def environment_resource_create(self, organization_id, params,
                                    set_allowed=True):
        pool_id = params.get('pool_id')
        employee_id = params.get('employee_id')
        if pool_id is not None and employee_id is not None and set_allowed:
            _, employee = self.client.employee_get(employee_id)
            self.set_allowed_pair(employee['auth_user_id'], pool_id)
        return self.client.environment_resource_create(
            organization_id, params)

    def cloud_resource_create_bulk(self, cloud_account_id, params,
                                   behavior='error_existing',
                                   return_resources=False, set_allowed=True,
                                   is_report_import=False):
        if set_allowed:
            resources = params['resources']
            employee_pool_map = {}
            for resource in resources:
                employee_id = resource.get('employee_id',
                                           resource.get('owner_id'))
                pool_id = resource.get('pool_id')
                if employee_id is not None and pool_id is not None:
                    if employee_id not in employee_pool_map:
                        employee_pool_map[employee_id] = set()
                    employee_pool_map[employee_id].add(pool_id)
            for employee_id, pools in employee_pool_map.items():
                _, employee = self.client.employee_get(employee_id)
                for pool_id in pools:
                    self.set_allowed_pair(employee['auth_user_id'], pool_id)

        return self.client.cloud_resource_create_bulk(
            cloud_account_id, params, behavior, return_resources,
            is_report_import)

    @staticmethod
    def get_client(version="v2"):
        return {
            "v2": optscale_client.rest_api_client.client_v2
        }.get(version)

    def tearDown(self):
        DBFactory.clean_type(DBType.Test)
        patch.stopall()
        if os.path.exists('db.sqlite'):
            os.remove('db.sqlite')
        super().tearDown()

    def init_db_session(self):
        db = DBFactory(DBType.Test, None).db
        engine = db.engine
        return BaseDB.session(engine)()

    def _make_controller(self, controller_class):
        db = DBFactory(DBType.Test, None).db
        engine = db.engine
        session = BaseDB.session(engine)()
        return controller_class(session, None, engine)

    @staticmethod
    def gen_id():
        return str(uuid.uuid4())

    def create_cloud_account(self, organization_id, config, account_id=None,
                             update_discovery_info=True, auth_user_id=None,
                             warnings=None):
        ctrl_map = {
            'aws_cnr': 'aws.Aws',
            'azure_cnr': 'azure.Azure',
            'gcp_cnr': 'gcp.Gcp',
            'nebius': 'nebius.Nebius',
        }
        if not account_id:
            account_id = self.gen_id()
        mock = None
        ctrl = ctrl_map.get(config.get('type'))
        if ctrl:
            patch_path = 'tools.cloud_adapter.clouds.%s.validate_credentials' % ctrl
            validate_response = {'account_id': account_id, 'warnings': []}
            if warnings:
                validate_response['warnings'] = warnings
            mock = patch(patch_path, return_value=validate_response)
            mock.start()
        if auth_user_id:
            patch('rest_api.rest_api_server.controllers.base.BaseController.'
                  'get_user_id',
                  return_value=auth_user_id).start()
        patch(
            'rest_api.rest_api_server.controllers.assignment.'
            'AssignmentController._authorize_action_for_pool',
            return_value=True).start()
        patch('rest_api.rest_api_server.controllers.report_import.'
              'ReportImportBaseController.publish_task').start()
        code, cloud_acc = self.client.cloud_account_create(
            organization_id, config)
        if code == 201 and update_discovery_info:
            _, infos = self.client.discovery_info_list(cloud_acc['id'])
            for info in infos['discovery_info']:
                self.client.discovery_info_update(
                    info['id'],
                    {'last_discovery_at': opttime.utcnow_timestamp()})
        if ctrl:
            mock.stop()
        return code, cloud_acc

    def verify_error_code(self, resp, code):
        self.assertTrue('error' in resp, 'No error in response')
        self.assertTrue('error_code' in resp['error'],
                        'No error code in error')
        self.assertEqual(code, resp['error']['error_code'],
                         msg='Unexpected error code')

    @staticmethod
    def _to_discovered_resource(cad_resource, first_seen=None):
        obj = {}
        resource_type_map = {
            InstanceResource: 'Instance',
            VolumeResource: 'Volume',
            PodResource: 'K8s Pod',
            IpAddressResource: 'IP Address'
        }
        model = type(cad_resource)
        for field in model().fields(meta_fields_incl=False):
            val = getattr(cad_resource, field)
            if val is not None:
                obj[field] = val
        obj.pop('resource_id', None)
        obj['meta'] = getattr(cad_resource, 'meta')
        obj['resource_type'] = resource_type_map.get(model)
        obj['last_seen'] = opttime.utcnow_timestamp()
        if first_seen is not None:
            obj['first_seen'] = first_seen
        obj['active'] = True
        return obj

    def resource_discovery_call(self, resources, create_resources=True,
                                first_seen=None):
        start_time = opttime.utcnow_timestamp()
        payloads_map = {}
        for rss in resources:
            obj = self._to_discovered_resource(rss, first_seen)
            if not payloads_map.get(obj['cloud_account_id']):
                payloads_map[obj['cloud_account_id']] = []
            payloads_map[obj['cloud_account_id']].append(obj)

        resources = []
        for cloud_acc_id, payload in payloads_map.items():
            if create_resources:
                code, response = self.cloud_resource_create_bulk(
                    cloud_acc_id, {'resources': payload},
                    behavior='update_existing', return_resources=True)
                self.assertEqual(code, 200)
                resources.extend(response['resources'])

            _, info_list = self.client.discovery_info_list(cloud_acc_id)
            for info in info_list.get('discovery_info', []):
                self.client.discovery_info_update(
                    info['id'], {'last_discovery_at': start_time})
        return resources

    @staticmethod
    def export_id_from_link(link):
        return link.rsplit('/', 1)[1]

    def _make_resources_active(self, resource_ids):
        seen_time = opttime.utcnow_timestamp() - 1
        self.resources_collection.bulk_write([UpdateMany(
            filter={'_id': {'$in': resource_ids}},
            update={'$set': {'last_seen': seen_time, 'active': True}},
        )])

    def _make_resources_inactive(self, resource_ids):
        self.resources_collection.bulk_write([UpdateMany(
            filter={'_id': {'$in': resource_ids}},
            update={'$set': {'active': False}},
        )])

    def update_default_owner_for_pool(self, org_pool_id, owner_id):
        session = self.init_db_session()
        session.query(Pool).filter(Pool.id == org_pool_id).update({
            'default_owner_id': owner_id})
        session.commit()

    def patched_aggregate_resource_ids(self, match_query, **kwargs):
        group_stage = {
            '_id': {
                'cloud_account_id': {'$ifNull': ['$cloud_account_id', None]},
                'cluster_id': {'$ifNull': ['$cluster_id', None]},
                'day': {'$trunc': {
                    '$divide': ['$first_seen', 86400]}},
            },
            'resources': {'$addToSet': '$_id'},
        }
        return self.resources_collection.aggregate([
            {'$match': match_query},
            {'$group': group_stage}
        ], allowDiskUse=True)

    def patched_aggregate_breakdown_expenses(self, match_query, **kwargs):
        group_by = kwargs.get('breakdown_by')
        group_dict = {
            'cloud_account_id': {'$ifNull': ['$cloud_account_id', None]},
            'cluster_id': {'$ifNull': ['$cluster_id', None]},
            'is_environment': {'$ifNull': ['$is_environment', False]},
            'day': {'$trunc': {
                '$divide': ['$first_seen', 86400]}},
        }
        if group_by and group_by not in group_dict:
            group_dict[group_by] = {'$ifNull': ['$%s' % group_by, None]}
        group_stage = {
            '_id': group_dict,
            'resources': {'$addToSet': '$_id'},
        }
        return self.resources_collection.aggregate([
            {'$match': match_query},
            {'$group': group_stage}
        ], allowDiskUse=True)

    def patched_aggregate_resource_filters(self, match_query, **kwargs):
        # Original pipeline uses $objectToArray. Mongomock does not support
        # this expression so we mock it and replace with changed pipeline with
        # same logic. Looks like $objectToArray will be added soon:
        # https://github.com/mongomock/mongomock/commit/c32b94a7691cb882cbd415eb90f71ad2982b0780
        # also mongomock cannot add to set False value
        last_recommend_run = kwargs['last_recommend_run']
        match_query['$and'].append({'cluster_id': None})
        collected_filters = [
            'service_name', 'pool_id', 'employee_id', 'k8s_node', 'region',
            'resource_type', 'k8s_namespace', 'k8s_service', 'cloud_account_id'
        ]
        group_stage = {
            f: {'$addToSet': {'$ifNull': ['$%s' % f, None]}}
            for f in collected_filters
        }
        group_stage.update({
            '_id': {
                'cloud_account_id': {'$ifNull': ['$cloud_account_id', None]},
                'cluster_type_id': {'$ifNull': ['$cluster_type_id', None]},
                'is_environment': {'$ifNull': ['$is_environment', None]},
                'day': {'$trunc': {
                    '$divide': ['$first_seen', 86400]}},
            },
            'resources': {'$push': '$$ROOT'}
        })
        res = list(self.resources_collection.aggregate([
            {'$match': match_query},
            {'$group': group_stage}
        ]))
        result = []
        for r_set in res:
            result_part = {'_id': r_set['_id']}
            params = defaultdict(set)
            for r in r_set['resources']:
                params['cloud_resource_ids'].add(r['cloud_resource_id'])
                for k in collected_filters:
                    params[k].add(r.get(k))
                r_tags = set([t for t in r.get('tags', {}).keys()])
                params['tags'].update(r_tags)
                r_meta = set([t for t in r.get('meta', {}).keys()])
                params['meta'].update(r_meta)
                recommendations = False
                rec = r.get('recommendations')
                if rec and (rec['run_timestamp'] >= last_recommend_run):
                    recommendations = True
                params['recommendations'].add(recommendations)
                params['constraint_violated'].add(
                    True if r.get('constraint_violated') else False)
                params['active'].add(
                    True if r.get('active') else False)
                result_part.update(params)
            result.append(result_part)
        return result

    def patched_get_resources_breakdowns_pipeline(
            self, match_query, breakdown_by, start_date, end_date,
            collected_filters):
        # Original pipeline uses $objectToArray in collecting tags. Mongomock
        # does not support this expression so we mock it and replace with
        # changed pipeline with the same logic.
        seconds_in_day = 86400
        match_stage = {
            '$match': match_query
        }
        add_stage = {
            '$addFields': {
                'first_breakdown': {
                    '$subtract': [
                        '$first_seen', {
                            '$mod': ['$first_seen', seconds_in_day]}]
                },
                'last_breakdown':
                    {
                        '$subtract': [
                            '$last_seen', {
                                '$mod': ['$last_seen', seconds_in_day]}
                        ]},
                'cloud_account_id': {
                    '$ifNull': ['$cloud_account_id', None]},
            }
        }

        def get_breakdown_dates(start_date, end_date):
            first_breakdown = int(datetime.fromtimestamp(start_date).replace(
                hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc
            ).timestamp())
            last_breakdown = int(datetime.fromtimestamp(end_date).replace(
                hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc
            ).timestamp())
            return [x for x in range(first_breakdown, last_breakdown + 1,
                                     seconds_in_day)]

        facet_stage = {'$facet': {}}
        facet_stage['$facet'].update({
            f: [{'$project': {'_id': 1, f: '$%s' % f}},
                {'$group': {'_id': None, f: {'$addToSet': '$%s' % f}}}]
            for f in collected_filters})
        facet_stage['$facet'].update({'total': [{'$count': 'count'}]})
        breakdowns = get_breakdown_dates(start_date, end_date)
        if breakdown_by == 'resource_type':
            # mongomock doesn't support grouping as in original pipeline
            group_value = {
                'resource_type': {'$ifNull': ['$resource_type', None]},
                'is_cluster': {'$ifNull': ['$cluster_type_id', None]},
                'is_environment': {'$ifNull': ['$is_environment', None]}
            }
        else:
            group_value = '$%s' % breakdown_by

        brkdwns = {'%s' % b: {'$sum': {'$cond': [{'$and': [
            {'$lte': ['$first_breakdown', b]},
            {'$gte': ['$last_breakdown', b]}]}, 1, 0]}} for b in breakdowns}
        brkdwns_created = {'%s_crt' % b: {'$sum': {'$cond': [{'$and': [
            {'$eq': ['$first_breakdown', b]},
            {'$ne': [b, breakdowns[0]]}]}, 1, 0]}} for b in breakdowns}
        brkdwns_removed = {'%s_rmv' % b: {'$sum': {'$cond': [{'$and': [
            {'$eq': ['$last_breakdown', b - SECONDS_IN_DAY]},
            {'$ne': [b, breakdowns[0]]}]}, 1, 0]}} for b in breakdowns}

        facet_stage['$facet'].update({'breakdowns': [
            {'$group': {'_id': group_value, **brkdwns, **brkdwns_created,
                        **brkdwns_removed}},
            {'$project': {
                '_id': '$_id',
                'breakdowns': {
                    'count': {'%s' % b: '$%s' % b for b in breakdowns},
                    'created': {'%s' % b: '$%s_crt' % b
                                for b in breakdowns},
                    'deleted_day_before': {'%s' % b: '$%s_rmv' % b
                                           for b in breakdowns},
                    'average': {'$divide': [{'$sum': ['$%s' % b
                                                      for b in breakdowns]},
                                            len(breakdowns)]}
                }}}]})
        facet_stage['$facet'].update({'totals': [{
            '$group': {'_id': group_value, 'count': {'$sum': 1}}}, {
            '$project': {'_id': '$_id', 'count': '$count'}
        }]})

        pipeline = [
            match_stage,
            add_stage,
            facet_stage
        ]
        raw_data = self.resources_collection.aggregate(pipeline,
                                                       allowDiskUse=True)
        result = list(raw_data)
        for br in result[0]['breakdowns']:
            for k in list(br.keys()):
                if br[k] == 0:
                    del br[k]
        return result

    def patched_execute_clickhouse(self, query, **kwargs):
        params = kwargs.pop('parameters', {})
        external_tables = kwargs.pop('external_data', ExternalData())
        if kwargs:
            self.assertEqual(kwargs, {},
                             '%s parameters is not processed in unittests'
                             % kwargs.keys())
        for k, v in params.items():
            if isinstance(v, datetime):
                v = f"'{v.replace(microsecond=0)}'"
            query = query.replace(f"%({k})s", f"{v}")
        query = ' '.join(list(filter(
            lambda x: x != '', query.replace('\n', ' ').split(' '))))

        def get_csv(path):
            return f"file('{path}/{name}.csv', 'CSV', '{structure}')"
        ch_expenses_map = {
            'expenses': (
                [
                    ('cloud_account_id', 'String', 'default'),
                    ('resource_id', 'String', 'default'),
                    ('date', 'DateTime', opttime.utcnow()),
                    ('cost', 'Float64', 0),
                    ('sign', 'Int8', 1)
                ], self.expenses
            ),
            'traffic_expenses': (
                [
                    ('cloud_account_id', 'String', 'default'),
                    ('resource_id', 'String', 'default'),
                    ('date', 'DateTime', opttime.utcnow()),
                    ('type', "Enum8('outbound' = 1, 'inbound' = 2)", 1),
                    ('from', 'String', 'default'),
                    ('to', 'String', 'default'),
                    ('usage', 'Float64', 0),
                    ('cost', 'Float64', 0),
                    ('sign', 'Int8', 1)
                ], self.traffic_expenses
            ),
            'ri_sp_usage': (
                [
                    ('cloud_account_id', 'String', 'default'),
                    ('resource_id', 'String', 'default'),
                    ('date', 'DateTime', opttime.utcnow()),
                    ('instance_type', 'String', ''),
                    ('offer_id', 'String', 'default'),
                    ('offer_type', "Enum8('ri' = 1, 'sp' = 2)", 1),
                    ('offer_cost', 'Float64', 0),
                    ('on_demand_cost', 'Float64', 0),
                    ('usage', 'Float64', 0),
                    ('ri_norm_factor', 'Float32', 0),
                    ('sp_rate', 'Float32', 0),
                    ('expected_cost', 'Float64', 0),
                    ('sign', 'Int8', 1)
                ], self.ri_sp_usage
            ),
            'uncovered_usage': (
                [
                    ('cloud_account_id', 'String', 'default'),
                    ('resource_id', 'String', 'default'),
                    ('date', 'DateTime', opttime.utcnow()),
                    ('instance_type', 'String', 'default'),
                    ('os', 'String', 'default'),
                    ('location', 'String', 'default'),
                    ('usage', 'Float64', 0),
                    ('cost', 'Float64', 0),
                    ('sign', 'Int8', 1)
                ], self.uncovered_usage
            ),
            'gemini': (
                [
                    ('id', 'String', 'default'),
                    ('tag', 'String', 'default'),
                    ('bucket', 'String', 'default'),
                    ('key', 'String', 'default'),
                    ('size', 'Integer', 1),
                    ('date', 'DateTime', opttime.utcnow()),
                ], self.gemini
            )
        }
        if 'from traffic_expenses' in query.lower():
            table_name = 'traffic_expenses'
        elif 'from ri_sp_usage' in query.lower():
            table_name = 'ri_sp_usage'
        elif 'from uncovered_usage' in query.lower():
            table_name = 'uncovered_usage'
        elif 'from gemini' in query.lower():
            table_name = 'gemini'
        else:
            table_name = 'expenses'
        expense_field_types, data_source = ch_expenses_map[table_name]
        with tempfile.TemporaryDirectory() as tmp_dir:
            for external_table in external_tables.files:
                name = external_table.name
                structure = external_table.structure
                data = external_table.data
                file_path = f'{tmp_dir}/{name}.csv'
                with open(file_path, 'wb') as csvfile:
                    csvfile.write(data)
                if f'JOIN {name}' in query:
                    query = query.replace(
                        f"JOIN {name}",
                        f"JOIN (select * from {get_csv(tmp_dir)}) as {name}")
                if f'IN {name}' in query:
                    query = query.replace(
                        f"IN {name}", f"IN (select * from {get_csv(tmp_dir)})")
            expense_fields = list(map(lambda x: x[0], expense_field_types))
            expenses_structure = ', '.join(
                [f'{x[0]} {x[1]}' for x in expense_field_types])
            if not data_source:
                data_source.append({e[0]: e[2] for e in expense_field_types})
            expenses = []
            for e in data_source:
                self.assertSetEqual(set(e.keys()), set(expense_fields),
                                    'Invalid expense inserted')
                if isinstance(e['date'], datetime):
                    e['date'] = e['date'].replace(microsecond=0, tzinfo=None)
                exp = []
                for ft in expense_field_types:
                    exp.append(str(e[ft[0]]))
                expenses.append(','.join(exp))
            expenses_str = '\\n'.join(expenses)
            ROOT_DIR = get_root_directory_path()
            clickhouse_path = f'{ROOT_DIR}/.clickhouse/clickhouse'
            command = [
                f'echo -e "{expenses_str}" |',
                f'{clickhouse_path} local',
                f'--table {table_name}',
                f'--structure "{expenses_structure}"',
                f'--input-format "CSV"',
                f'--query "{query}"'
            ]
            res = subprocess.check_output(
                ['bash', '-c', ' '.join(command)]).decode("utf-8")
        response = []

        def convert_value(value):
            if v == '\\N':
                # side effect of csv writer with None values
                return None
            for t in [int, float, datetime.fromisoformat]:
                try:
                    return t(value)
                except ValueError:
                    pass
            return value

        for row in res.split('\n'):
            if row == '':
                continue
            try:
                t_values = (literal_eval(row),)
            except (SyntaxError, ValueError):
                t_values = tuple(row.split('\t'))
            result_row = tuple()
            for v in t_values:
                result_row += (v if isinstance(v, tuple) else convert_value(v),)
            response.append(result_row)
        return response

    @contextmanager
    def switch_user(self, user_id):
        old_user = self.p_get_meta_by_token.return_value.get('user_id')
        self._mock_auth_user(user_id)
        try:
            yield
        finally:
            self._mock_auth_user(old_user)

    def _mock_auth_user(self, user_id):
        self.p_get_meta_by_token.return_value = {
            'user_id': user_id,
            'valid_until': opttime.utcnow_timestamp() * 2
        }

    def delete_organization(self, org_id):
        with patch(
                'rest_api.rest_api_server.controllers.employee.'
                'EmployeeController.get_org_manager_user',
                return_value=None) as p_get_org_manager_user, \
                patch(
                    'rest_api.rest_api_server.controllers.employee.'
                    'AuthClient.assignment_list',
                    return_value=(200, [])
        ), \
                patch(
                'rest_api.rest_api_server.controllers.employee.'
                'EmployeeController._reassign_resources_to_new_owner',
                return_value=None) as p_reassign_resources_to_new_owner:
            code, _ = self.client.organization_delete(org_id)
            p_get_org_manager_user.assert_not_called()
            p_reassign_resources_to_new_owner.assert_not_called()
        self.assertEqual(code, 204)

    def update_resource_info_by_expenses(self, resource_ids):
        # let's imagine that this is diworker
        resources = self.resources_collection.find(
            {'_id': {'$in': resource_ids}}, ['first_seen', 'last_seen'])
        for r in resources:
            resource_expenses = list(filter(
                lambda x: x['resource_id'] == r['_id'], self.expenses))
            total_cost = 0
            last_expense = {}
            min_date = r.get('first_seen')
            max_date = r.get('last_seen')
            for e in resource_expenses:
                exp_date = int(e['date'].timestamp())
                min_date = min_date if exp_date > min_date else exp_date
                max_date = max_date if exp_date < max_date else exp_date
                if not last_expense or last_expense['date'] < exp_date:
                    last_expense = {
                        'date': exp_date, 'cost': e['cost']
                    }
                total_cost += (e['cost'] * e['sign'])
            updates = {
                'total_cost': total_cost,
                'first_seen': min_date,
                'last_seen': max_date
            }
            if last_expense:
                updates.update({'last_expense': last_expense})
            self.resources_collection.update_one(
                filter={
                    '_id': r['_id']
                },
                update={'$set': updates}
            )

    def pool_id_with_subpools(self, pool_id):
        return pool_id + self.WITH_SUBPOOLS_SIGN

    @staticmethod
    def create_org_constraint(organization_id, pool_id, filters=None,
                              constraint_type=None, deleted=False):
        db = DBFactory(DBType.Test, None).db
        engine = db.engine
        session = BaseDB.session(engine)()
        if not constraint_type:
            constraint_type = OrganizationConstraintTypes.EXPENSE_ANOMALY.value
        if constraint_type == OrganizationConstraintTypes.RESOURCE_QUOTA.value:
            definition = {'max_value': 123}
        elif constraint_type == OrganizationConstraintTypes.EXPIRING_BUDGET.value:
            definition = {'total_budget': 123, 'start_date': 0}
        elif constraint_type == OrganizationConstraintTypes.RECURRING_BUDGET.value:
            definition = {'monthly_budget': 123}
        else:
            definition = {'threshold_days': 123, 'threshold': 1}
        if not filters:
            filters = {'pool_id': [pool_id]}
        if not constraint_type:
            constraint_type = OrganizationConstraintTypes.EXPENSE_ANOMALY.value
        constraint = OrganizationConstraint(
            name='constr',
            type=constraint_type,
            definition=definition,
            filters=filters,
            organization_id=organization_id,
        )
        session.add(constraint)
        if deleted:
            constraint.deleted_at = opttime.utcnow_timestamp()
        session.commit()
        res = constraint.to_dict()
        res['type'] = res['type'].value
        return res

    def create_org_limit_hit(self, organization_id, pool_id, constraint_id=None,
                             created_at=None, filters=None, deleted=False):
        if not constraint_id:
            constr = self.create_org_constraint(organization_id, pool_id,
                                                filters=filters)
            constraint_id = constr['id']
        db = DBFactory(DBType.Test, None).db
        engine = db.engine
        session = BaseDB.session(engine)()
        hit = OrganizationLimitHit(
            organization_id=organization_id,
            constraint_id=constraint_id,
            constraint_limit=31.11,
            value=91.37
        )
        if created_at:
            hit.created_at = created_at
        session.add(hit)
        if deleted:
            hit.deleted_at = opttime.utcnow_timestamp()
        session.commit()
        res = hit.to_dict()
        return res

    @staticmethod
    def get_all_org_cloud_accounts(organization_id):
        db = DBFactory(DBType.Test, None).db
        engine = db.engine
        session = BaseDB.session(engine)()
        cloud_accounts = session.query(CloudAccount).filter(
            CloudAccount.organization_id == organization_id,
            CloudAccount.deleted_at == 0).all()
        return list(cloud_accounts)
